package fun.easycode.datastream;

import fun.easycode.jointblock.core.JointBlockMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * 全量数据处理生产者
 *  因为流式查询的数据是一条一条的，所以需要将数据组装成一个List，然后放入队列中
 *  由消费者从队列中取出数据，然后进行处理，只存在一个生产者，是1对多的关系
 * @author xuzhen97
 * @param <I>
 */
@Slf4j
public class DataCompleteProducer<I> implements Runnable {

    private final IDataProcessor<I> processor;
    private final DataCompleteExecutor dataCompleteExecutor;
    private final String querySql;
    private final DataCompleteProducerFinishCallback callback;

    private final BlockingQueue<List<I>> queue = new ArrayBlockingQueue<>(10);
    /**
     * 全量数据处理完成订阅者
     */
    private final List<DataCompleteProducerFinishNotify> finishSubscribers = new ArrayList<>();

    /**
     * 消费者数量
     */
    private static final int CONSUMER_SIZE = 4;

    /**
     * 缓存大小
     */
    private static final int CACHE_SIZE = 1000;

    /**
     * 构造函数
     *
     * @param processor            数据处理器
     * @param dataCompleteExecutor 全量数据处理器
     * @param querySql             查询sql
     * @param callback             全量数据处理完成回调
     */
    public DataCompleteProducer(IDataProcessor<I> processor
            , DataCompleteExecutor dataCompleteExecutor
            , String querySql
            , DataCompleteProducerFinishCallback callback) {
        this.processor = processor;
        this.dataCompleteExecutor = dataCompleteExecutor;
        this.querySql = querySql;
        this.callback = callback;
    }

    /**
     * 添加全量数据处理完成订阅者
     * @param dataCompleteProducerFinishNotify 全量数据处理完成订阅者
     */
    public void addFinishSubscriber(DataCompleteProducerFinishNotify dataCompleteProducerFinishNotify){
        finishSubscribers.add(dataCompleteProducerFinishNotify);
    }

    /**
     * 通知全量数据处理完成订阅者
     */
    private void notifyFinish(){
        finishSubscribers.forEach(DataCompleteProducerFinishNotify::finish);
    }

    /**
     * 生产者线程执行逻辑
     */
    private void produce(){
        JointBlockMapper<I> jointBlockMapper = DataContext.getMapper(processor.getParam().getInputMapper());

        List<Future<List<String>>> futures = new ArrayList<>();

        // 启动消费者线程
        for (int i = 0; i < CONSUMER_SIZE; i++) {
            DataCompleteConsumer<I> consumer = new DataCompleteConsumer<>(queue, processor);
            this.addFinishSubscriber(consumer);
            futures.add(dataCompleteExecutor.submitTask(consumer));
        }

        List<I> cache = new ArrayList<>(CACHE_SIZE);

        AtomicLong totalSize = new AtomicLong(0);

        // 流式查询，将查询结果每CACHE_SIZE条数据放入队列中
        jointBlockMapper.streamQuerySql(StringUtils.isEmpty(querySql)?"":querySql, resultContext -> {
            I resultObject = resultContext.getResultObject();
            cache.add(resultObject);
            totalSize.incrementAndGet();

            if (cache.size() >= CACHE_SIZE) {
                try {
                    queue.put(Collections.synchronizedList(new ArrayList<>(cache)));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                cache.clear();
            }
        });
        // 将剩余的数据放入队列中
        if (cache.size() > 0) {
            try {
                queue.put(Collections.synchronizedList(new ArrayList<>(cache)));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 通知消费者，数据已经全部放入队列中, 处理完成可以结束任务
        notifyFinish();

        List<String> errorMsgList = futures.stream().flatMap(future -> {
            try {
                return future.get().stream();
            } catch (Exception e) {
                log.error("消费者处理异常：{}", e.getMessage());
                return Stream.of("消费者处理异常：" + e.getMessage());
            }
        }).collect(Collectors.toList());

        // 执行完成回调
        if(callback != null){
            callback.callback(totalSize.get(), errorMsgList);
        }
    }

    @Override
    public void run() {
        try{
            // 使用分布式锁，主要是和增量之间的数据处理互斥
            // 增量过程不能全量
            // 全量过程不能增量
            DataContext.tryLock(processor, this::produce);
        }catch (Exception e){
            // 如果生产逻辑报错，一定要通知到监听的消费者停止消费逻辑终止程序
            e.printStackTrace();
            notifyFinish();
            callback.callback(0, Collections.singletonList(e.getMessage()));
        }
    }
}
